package com.yxsk.relay.job.admin.core.executor.quartz;

import com.yxsk.relay.job.admin.core.collector.QuartzAsyncJobResultCollector;
import com.yxsk.relay.job.admin.core.executor.listener.JobExecuteListener;
import com.yxsk.relay.job.admin.core.router.endpoint.EndpointRouterInfo;
import com.yxsk.relay.job.admin.core.router.endpoint.SimpleEndpointRouterInfo;
import com.yxsk.relay.job.admin.core.schedule.quartz.QuartzDynamicScheduleService;
import com.yxsk.relay.job.admin.core.schedule.quartz.vo.JobConfig;
import com.yxsk.relay.job.admin.core.trigger.AsyncDynamicTrigger;
import com.yxsk.relay.job.admin.data.entity.JobExecuteLog;
import com.yxsk.relay.job.admin.data.entity.vo.QuartzJobInfo;
import com.yxsk.relay.job.admin.data.service.JobDetailService;
import com.yxsk.relay.job.admin.data.service.JobExecuteLogService;
import com.yxsk.relay.job.admin.data.service.JobTriggerLogService;
import com.yxsk.relay.job.component.admin.monitor.AsyncJobStatusMonitor;
import com.yxsk.relay.job.component.admin.monitor.DefaultAsyncJobMonitor;
import com.yxsk.relay.job.component.admin.monitor.callback.MonitorEventCallback;
import com.yxsk.relay.job.component.admin.utils.SpringBeanUtils;
import com.yxsk.relay.job.component.common.constant.NetProtocol;
import com.yxsk.relay.job.component.common.constant.UriConstant;
import com.yxsk.relay.job.component.common.exception.RelayJobRuntimeException;
import com.yxsk.relay.job.component.common.protocol.caller.RemoteCaller;
import com.yxsk.relay.job.component.common.protocol.message.base.ResultResponse;
import com.yxsk.relay.job.component.common.protocol.message.execute.JobExecuteRequest;
import com.yxsk.relay.job.component.common.protocol.message.execute.JobExecuteResponse;
import com.yxsk.relay.job.component.common.utils.DateUtils;
import com.yxsk.relay.job.component.common.utils.ExceptionUtils;
import com.yxsk.relay.job.component.common.vo.Endpoint;
import com.yxsk.relay.job.component.common.vo.ExecuteResult;
import lombok.*;
import org.springframework.util.CollectionUtils;

import java.util.List;

/**
 * @Author 11376
 * @CreaTime 2019/6/28 10:34
 * @Description
 */
public class QuartzAsyncRemoteJobExecutor extends QuartzRemoteJobExecutor {

    protected AsyncJobStatusMonitor jobStatusMonitor;

    protected QuartzAsyncJobResultCollector.ExecuteCompleteCallback completeCallback;

    public QuartzAsyncRemoteJobExecutor(JobExecuteListener executeListener, AsyncJobStatusMonitor jobStatusMonitor) {
        super(executeListener);
        this.jobStatusMonitor = jobStatusMonitor;
    }

    public QuartzAsyncRemoteJobExecutor(JobExecuteListener executeListener,
                                        AsyncJobStatusMonitor jobStatusMonitor,
                                        QuartzAsyncJobResultCollector.ExecuteCompleteCallback completeCallback) {
        super(executeListener);
        this.jobStatusMonitor = jobStatusMonitor;
        this.completeCallback = completeCallback;
    }

    @Override
    protected void beforeRemoteCall(EndpointRouterInfo endpointRouterInfo, JobExecuteRequest executeRequest) {
        super.beforeRemoteCall(endpointRouterInfo, executeRequest);

        // 执行中
        AsyncJobStatusMonitor.MonitorRequest monitorRequest = new AsyncJobStatusMonitor.MonitorRequest();
        monitorRequest.setExecuteJobId(executeRequest.getSerialNo());
        Endpoint endpoint = endpointRouterInfo.getEndpoint();
        monitorRequest.setHost(endpoint.getHost());
        monitorRequest.setPort(endpoint.getPort());
        monitorRequest.setToken(endpoint.getAuthToken());
        monitorRequest.setResultCallback(buildCallbackInfo(endpointRouterInfo, executeRequest));
        Long executeTimeout = this.context.getJobConfig().getExecuteTimeout();
        if (executeTimeout != null && executeTimeout > 0) {
            this.jobStatusMonitor.monitor(monitorRequest, executeTimeout, false);
        } else {
            this.jobStatusMonitor.monitor(monitorRequest, false);
        }
    }

    @Override
    protected void handleRemoteExecuteError(JobExecuteRequest request, Exception e) {
        // 放弃任务监控
        this.jobStatusMonitor.giveUpMonitor(request.getSerialNo());

        // 处理异常
        super.handleRemoteExecuteError(request, e);
    }

    @Override
    protected final void handleExecuteResponse(EndpointRouterInfo endpointRouterInfo, JobExecuteRequest executeRequest, ResultResponse<JobExecuteResponse> response) {
        if (response == null) {
            throw new RelayJobRuntimeException("执行响应为空");
        }

        if (ResultResponse.SUCCESS_CODE.equals(response.getCode())) {
            // 判断任务触发结果是否成功
            JobExecuteResponse executeResponse = response.getData();
            if (ExecuteResult.ExecuteResultCode.EXECUTING.getCode().equals(executeResponse.getExecuteStatus())) {
                // 开始监控
                this.jobStatusMonitor.startMonitor(executeRequest.getSerialNo());
                return;
            }
        } else {
            this.doJobTriggerFailed(endpointRouterInfo, executeRequest, response);
        }

        // 放弃任务监控
        this.jobStatusMonitor.giveUpMonitor(executeRequest.getSerialNo());
    }

    @Override
    protected String buildExecuteUri() {
        Endpoint endpoint = this.context.getEndpointRouterInfo().getEndpoint();
        return new StringBuilder("http://").append(endpoint.getHost()).append(":").append(endpoint.getPort()).append(UriConstant.TASK_ASYNC_EXECUTE_URI).toString();
    }

    private QuartzAsyncJobCallback buildCallbackInfo(EndpointRouterInfo endpointRouterInfo, JobExecuteRequest executeRequest) {
        return new QuartzAsyncJobCallback(QuartzAsyncRemoteJobExecutor.AsyncMonitorContext.builder()
                .endpointRouterInfo(endpointRouterInfo)
                .jobConfig(this.context.getJobConfig())
                .executeParam(this.context.getTriggerInfo().getTriggerParam())
                .executeRequest(executeRequest)
                .build());
    }

    protected void doJobTriggerFailed(EndpointRouterInfo endpointRouterInfo, JobExecuteRequest executeRequest, ResultResponse<JobExecuteResponse> response) {
        // 修改执行日志
        JobExecuteLogService executeLogService = SpringBeanUtils.getBean(JobExecuteLogService.class);
        JobExecuteLog executeLog = executeLogService.findById(executeRequest.getSerialNo());
        executeLog.setEndTime(DateUtils.getCurrentDate());
        executeLog.setExecuteStatus(ExecuteResult.ExecuteResultCode.ERROR.getCode());
        executeLog.setExecuteErrorMessage("远程任务执行错误, 响应信息：" + response.toString());

        executeLogService.updateById(executeLog);

        doExecuteComplete();
    }

    protected void doJobFinishCallback(QuartzAsyncRemoteJobExecutor.AsyncMonitorContext context, JobExecuteResponse executeResponse) {
        this.handleExecuteResult(context.getExecuteRequest().getSerialNo(), executeResponse);

        doExecuteComplete();
    }

    protected void doJobErrorCallback(QuartzAsyncRemoteJobExecutor.AsyncMonitorContext context, Throwable e) {
        // 修改执行日志
        JobExecuteLogService executeLogService = SpringBeanUtils.getBean(JobExecuteLogService.class);
        JobExecuteLog executeLog = executeLogService.findById(context.getExecuteRequest().getSerialNo());
        executeLog.setEndTime(DateUtils.getCurrentDate());
        executeLog.setExecuteStatus(ExecuteResult.ExecuteResultCode.ERROR.getCode());
        executeLog.setExecuteErrorMessage(ExceptionUtils.getStackInfo(e));

        executeLogService.updateById(executeLog);

        doExecuteComplete();
    }

    protected void doExecuteTimeout(QuartzAsyncRemoteJobExecutor.AsyncMonitorContext context) {
        // 修改执行日志
        JobExecuteLogService executeLogService = SpringBeanUtils.getBean(JobExecuteLogService.class);
        JobExecuteLog executeLog = executeLogService.findById(context.getExecuteRequest().getSerialNo());
        executeLog.setEndTime(DateUtils.getCurrentDate());
        executeLog.setExecuteStatus(ExecuteResult.ExecuteResultCode.EXECUTE_TIMEOUT.getCode());
        executeLog.setExecuteErrorMessage("任务执行超时");

        executeLogService.updateById(executeLog);

        doExecuteComplete();
    }

    protected void doExecuteComplete() {
        if (this.completeCallback != null) {
            this.completeCallback.callback();
        }
    }

    @AllArgsConstructor
    public class QuartzAsyncJobCallback implements MonitorEventCallback {

        private QuartzAsyncRemoteJobExecutor.AsyncMonitorContext context;

        @Override
        public void onFinish(String jobId, JobExecuteResponse executeResponse) {
            /**
             * 任务完成时回调 {@link AsyncDynamicTrigger#doJobFinishCallback(AsyncDynamicTrigger.AsyncMonitorContext, JobExecuteResponse)} 方法
             */
            doJobFinishCallback(this.context, executeResponse);
        }

        @Override
        public void onError(String jobId, Throwable e) {
            /**
             * 任务异常时回调 {@link AsyncDynamicTrigger#doJobErrorCallback(AsyncDynamicTrigger.AsyncMonitorContext, Throwable)}
             */
            doJobErrorCallback(this.context, e);
        }

        @Override
        public void onExecuteTimeout(String jobId) {
            doExecuteTimeout(this.context);
        }

    }


    public static class QuartzAsyncJobMonitor extends DefaultAsyncJobMonitor {

        public QuartzAsyncJobMonitor(RemoteCaller remoteCaller) {
            super(remoteCaller);
        }

        @Override
        public void reloadExecutingJob() {
            JobExecuteLogService executeLogService = SpringBeanUtils.getBean(JobExecuteLogService.class);
            JobDetailService detailService = SpringBeanUtils.getBean(JobDetailService.class);
            JobTriggerLogService triggerLogService = SpringBeanUtils.getBean(JobTriggerLogService.class);
            List<JobExecuteLog> logs = executeLogService.findAllExecutingLog();
            if (!CollectionUtils.isEmpty(logs)) {
                logs.stream().forEach(jobExecuteLog -> {
                    EndpointRouterInfo endpointRouterInfo = loadEndpoint(jobExecuteLog);
                    QuartzJobInfo jobConfig = detailService.getJobConfig(triggerLogService.findById(jobExecuteLog.getTriggerLogId()).getJobId());
                    JobExecuteRequest request = buildExecuteRequest(jobExecuteLog, jobConfig);
                    ResultResponse<JobExecuteResponse> response = buildResponse(request);
                    this.handleExecuteResponse(endpointRouterInfo, request, jobConfig, response);
                });
            }
            super.reloadExecutingJob();
        }

        private void handleExecuteResponse(EndpointRouterInfo endpointRouterInfo, JobExecuteRequest request, JobConfig jobConfig, ResultResponse<JobExecuteResponse> response) {
            // 执行中
            AsyncJobStatusMonitor.MonitorRequest monitorRequest = new AsyncJobStatusMonitor.MonitorRequest();
            monitorRequest.setExecuteJobId(request.getSerialNo());
            Endpoint endpoint = endpointRouterInfo.getEndpoint();
            monitorRequest.setHost(endpoint.getHost());
            monitorRequest.setPort(endpoint.getPort());
            monitorRequest.setToken(endpoint.getAuthToken());
            // 获取执行监听
            JobExecuteListener executeListener = SpringBeanUtils.getBean(QuartzDynamicScheduleService.class).getExecuteManager().jobExecuteListener();
            // 注入执行回调
            monitorRequest.setResultCallback(new QuartzAsyncRemoteJobExecutor(executeListener, this).new QuartzAsyncJobCallback(AsyncMonitorContext.builder()
                    .endpointRouterInfo(endpointRouterInfo)
                    .jobConfig(jobConfig)
                    .executeParam(request.getParam())
                    .executeRequest(request)
                    .build()));
            Long executeTimeout = jobConfig.getExecuteTimeout();
            if (executeTimeout != null && executeTimeout > 0) {
                this.monitor(monitorRequest, executeTimeout);
            } else {
                this.monitor(monitorRequest);
            }
        }

        private ResultResponse<JobExecuteResponse> buildResponse(JobExecuteRequest request) {
            JobExecuteResponse resp = new JobExecuteResponse();
            resp.setExecuteStatus(ExecuteResult.ExecuteResultCode.EXECUTING.getCode());
            resp.setRequestSerialNo(request.getSerialNo());
            return (ResultResponse<JobExecuteResponse>) ResultResponse.ok(resp);
        }

        private JobExecuteRequest buildExecuteRequest(JobExecuteLog jobExecuteLog, QuartzJobInfo jobConfig) {
            JobExecuteRequest request = new JobExecuteRequest();
            request.setSerialNo(jobExecuteLog.getId());
            request.setJobId(jobConfig.getId());
            request.setHandleName(jobConfig.getHandlerName());
            request.setRequestTime(jobExecuteLog.getBeginTime());
            request.setParam(jobExecuteLog.getExecuteParam());
            return request;
        }

        private EndpointRouterInfo loadEndpoint(JobExecuteLog jobExecuteLog) {
            EndpointRouterInfo endpointRouterInfo = new SimpleEndpointRouterInfo();
            Endpoint endpoint = new Endpoint();
            endpoint.setAuthToken(jobExecuteLog.getAuthToken());
            endpoint.setHost(jobExecuteLog.getEndpointIp());
            endpoint.setPort(jobExecuteLog.getEndpointPort());
            endpoint.setProtocol(NetProtocol.getProtocol(jobExecuteLog.getNetProtocol()));

            endpointRouterInfo.setEndpoint(endpoint);
            return endpointRouterInfo;
        }
    }

    @Builder
    @Getter
    @Setter
    @ToString
    public static class AsyncMonitorContext {
        private EndpointRouterInfo endpointRouterInfo;
        private JobConfig jobConfig;
        private String executeParam;
        private JobExecuteRequest executeRequest;
    }

}
